Add copy-on-write mode for Iceberg row-level DELETE, UPDATE, and MERGE#28958
Add copy-on-write mode for Iceberg row-level DELETE, UPDATE, and MERGE#28958kaveti wants to merge 1 commit intotrinodb:masterfrom
Conversation
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughThis pull request implements copy-on-write (CoW) support for Iceberg row-level operations (DELETE, UPDATE, MERGE). A new Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.42.1)plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaThanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (2)
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java (1)
200-220: Consider centralizing thisIcebergMetadatatest fixture.This positional constructor block is now duplicated across the Nessie, REST, and Glue catalog tests, including the extra executor and trailing
null. A small helper onBaseTrinoCatalogTestwould make future signature changes much less error-prone.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java` around lines 200 - 220, Extract the duplicated IcebergMetadata positional constructor block into a single helper method on BaseTrinoCatalogTest (e.g., createTestIcebergMetadata or buildIcebergMetadataFixture) and update TestTrinoNessieCatalog and the REST and Glue catalog tests to call that helper; preserve the existing arguments used (PLANNER_CONTEXT.getTypeManager(), jsonCodec(CommitTaskData.class), catalog, the UnsupportedOperationException file IO lambda, TABLE_STATISTICS_READER, new TableStatisticsWriter(new NodeVersion("test-version")), UNSUPPORTED_DELETION_VECTOR_WRITER, Optional.empty(), false, _ -> false, newDirectExecutorService(), directExecutor(), newDirectExecutorService(), newDirectExecutorService(), newDirectExecutorService(), 0, ZERO, null) so future constructor signature changes are localized to BaseTrinoCatalogTest.plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.java (1)
625-642: Avoid hard-coding exact physical file layout in these CoW assertions.
doesNotContainAnyElementsOf(filesBefore),hasSize(3),hasSize(2), andcontainsAll(filesAfter)assume one stable data file per write batch and a specific row-to-file distribution. Writer parallelism/scaling changes can break those checks without any CoW regression, so these tests would be less flaky if they asserted only logical invariants and touched-file behavior.Also applies to: 665-676
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.java` around lines 625 - 642, In TestIcebergCopyOnWrite, avoid asserting exact physical layout (counts or specific membership) of data files: replace checks like hasSize(...), containsAll(...), and doesNotContainAnyElementsOf(filesBefore) with logical/touch-based assertions — e.g., call getDataFilePaths(tableName) before and after a DELETE/UPDATE and assert the resulting set is non-empty and not equal to the original set, and keep assertNoDeleteFiles(tableName) to ensure no delete files were produced; apply the same change for the similar block around lines 665-676 so tests assert only that files changed (set inequality) and remain non-empty rather than relying on specific file counts or exact file identity.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugin/trino-iceberg/COPY_ON_WRITE_DESIGN.md`:
- Around line 51-56: The design doc uses two executor names which creates
ambiguity—update the finishMerge() / finishCopyOnWrite() sections that currently
reference icebergScanExecutor so they consistently reference copyOnWriteExecutor
instead; replace any mention of icebergScanExecutor with copyOnWriteExecutor
(and note the DI tag `@ForIcebergCopyOnWrite` where relevant) so the threading
model matches the wiring described elsewhere.
In
`@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.java`:
- Around line 225-235: In CopyOnWriteFileRewriter's catch(Throwable t) (the
block that calls writer.rollback()), do not convert Errors or RuntimeExceptions
(including TrinoException) into a generic RuntimeException: after attempting
rollback, if t is an IOException wrap it in UncheckedIOException as before; if t
is an Error or RuntimeException rethrow it unchanged; for any other checked
Throwable (i.e., Throwable that is an Exception but not a RuntimeException) wrap
it in a RuntimeException with the same message including originalPath.
- Around line 101-113: The rewriteFile(...) method currently uses the single
IcebergFileFormat parameter for both reading source files and writing targets
and ignores any pre-existing delete files; update the method signature to accept
a separate source file format (e.g., IcebergFileFormat sourceFileFormat) and a
collection representing pre-existing delete files or a visible-row bitmap (e.g.,
List<DeleteFile> preExistingDeleteFiles or BitSet visibleRows) in addition to
the target write IcebergFileFormat fileFormat, then: when opening/reading the
original file use sourceFileFormat (not fileFormat) and apply
preExistingDeleteFiles/visibleRows to compute visible rows so deleted rows
aren’t resurrected, while still writing the output using fileFormat; propagate
these new parameters to all other affected call sites and to the other related
method ranges noted (around the blocks at 133-140, 192-214, 240-250) so callers
supply the source format and applicable delete files from the file’s
metadata/snapshots.
In
`@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java`:
- Around line 3677-3780: The rewrite rollback tracking is incomplete because
rollbackActions are only added after future.get() succeeds in submission order
and are never executed if later commitUpdate()/commitTransaction() fails; fix by
collecting rollback actions as soon as each rewrite completes (use a
CompletionService or Future.get in completion order) and ensure all futures are
awaited (or cancelled) and their rollback actions gathered, then run/clear
rollbackActions in a single catch/finally that covers rewrite completion,
append/commit phases and commitTransaction failures; reference rewriteFutures,
rollbackActions, CopyOnWriteFileRewriter.RewriteResult, commitUpdate, and
commitTransaction to locate the code paths to change.
- Around line 3661-3667: The CoW path using RewriteFiles from
transaction.newRewrite() currently only calls validateFromSnapshot(...) and
scanManifestsWith(...); update the CoW commit path in IcebergMetadata to perform
the same row-level conflict checks used by the MOR path (see
finishWrite/RowDelta logic): apply conflictDetectionFilter(...) with the same
predicate, call validateNoConflictingDataFiles(...) when SERIALIZABLE isolation
is configured, and also call validateDeletedFiles(...) and
validateNoConflictingDeleteFiles() before committing the RewriteFiles; ensure
you reference the same transaction/newRewrite(), validateFromSnapshot(...), and
scanManifestsWith(...) sequence and add these additional validation calls to
prevent commits that would bypass row-level conflict detection.
- Around line 3649-3654: The CoW rewrite currently only passes new worker
deletion vectors (aggregatedDeletions) to CopyOnWriteFileRewriter.rewriteFile(),
which drops pre-existing delete semantics; fix by collecting existing delete
metadata from the original data file referenced by deletionTaskDetails (e.g.,
the DataFile / FileScanTask attached to the task) and include those
position-delete files and any equality-delete files when invoking
CopyOnWriteFileRewriter.rewriteFile() instead of passing List.of(); merge or
apply existing deletion vectors with DeletionVector.builder().addAll(...) so the
rewriter receives both existing and new deletions (use aggregatedDeletions,
deletionTaskDetails, DeletionVector.builder(), and
CopyOnWriteFileRewriter.rewriteFile() as anchors).
In
`@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.java`:
- Line 103: The test uses assertQuery("SELECT * FROM " + tableName, "SELECT *
FROM nation WHERE ...") but the session created in createQueryRunner() is pinned
to ICEBERG_CATALOG.tpch so unqualified "nation" resolves to the Iceberg catalog
and causes TABLE NOT FOUND; update the expected query strings in the assertQuery
calls (the ones at the shown locations and the other occurrences at the lines
noted) to fully qualify the reference to the TPCH source (e.g., tpch.tiny.nation
or the exact catalog.schema.table used by the TPCH setup) so the right catalog
is queried; search for the assertQuery invocations in TestIcebergCopyOnWrite and
replace bare "nation" references accordingly.
---
Nitpick comments:
In
`@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java`:
- Around line 200-220: Extract the duplicated IcebergMetadata positional
constructor block into a single helper method on BaseTrinoCatalogTest (e.g.,
createTestIcebergMetadata or buildIcebergMetadataFixture) and update
TestTrinoNessieCatalog and the REST and Glue catalog tests to call that helper;
preserve the existing arguments used (PLANNER_CONTEXT.getTypeManager(),
jsonCodec(CommitTaskData.class), catalog, the UnsupportedOperationException file
IO lambda, TABLE_STATISTICS_READER, new TableStatisticsWriter(new
NodeVersion("test-version")), UNSUPPORTED_DELETION_VECTOR_WRITER,
Optional.empty(), false, _ -> false, newDirectExecutorService(),
directExecutor(), newDirectExecutorService(), newDirectExecutorService(),
newDirectExecutorService(), 0, ZERO, null) so future constructor signature
changes are localized to BaseTrinoCatalogTest.
In
`@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.java`:
- Around line 625-642: In TestIcebergCopyOnWrite, avoid asserting exact physical
layout (counts or specific membership) of data files: replace checks like
hasSize(...), containsAll(...), and doesNotContainAnyElementsOf(filesBefore)
with logical/touch-based assertions — e.g., call getDataFilePaths(tableName)
before and after a DELETE/UPDATE and assert the resulting set is non-empty and
not equal to the original set, and keep assertNoDeleteFiles(tableName) to ensure
no delete files were produced; apply the same change for the similar block
around lines 665-676 so tests assert only that files changed (set inequality)
and remain non-empty rather than relying on specific file counts or exact file
identity.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8e0e439c-5862-439e-92dd-1d05ff2b9a0c
📒 Files selected for processing (19)
plugin/trino-iceberg/COPY_ON_WRITE_DESIGN.mdplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ForIcebergCopyOnWrite.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeTableHandle.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.javaplugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.javaplugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java
- Fix CopyOnWriteFileRewriter: rethrow RuntimeException/Error unchanged in catch block - Fix IcebergMetadata: move commit calls inside try block, collect all futures' rollback actions in catch - Fix TestIcebergCopyOnWrite: replace exact file count assertions with logical invariants - Centralize IcebergMetadata test fixture in BaseTrinoCatalogTest + clean up unused imports - Update 4 catalog test subclasses to use centralized helper
Implement Copy-on-Write mode for DELETE, UPDATE, and MERGE operations in the Iceberg connector. When a table has write.delete.mode, write.update.mode, or write.merge.mode set to 'copy-on-write', the connector rewrites affected data files instead of writing position delete files. Architecture: - Workers emit serialized deletion vectors to the coordinator - Coordinator aggregates deletions across workers per data file - CopyOnWriteFileRewriter reads original files, filters deleted rows, and writes replacement files - Iceberg RewriteFiles API atomically replaces old files with new ones This coordinator-side rewrite approach correctly handles distributed execution where multiple workers may process different rows from the same data file. Add parallel CoW file rewrite, comprehensive tests, and file-path verification - Parallelize coordinator-side file rewrites using icebergScanExecutor thread pool - Add 24 comprehensive Copy-on-Write tests covering DELETE, UPDATE, MERGE, partitioned tables, sequential DML, complex expressions, subquery deletes, time travel, nullable columns, multiple data files, larger datasets, and more - Add testCowReplacesOriginalDataFiles verifying original file paths are fully replaced after CoW operations (no delete files, no stale data files) - Use ConcurrentLinkedQueue for thread-safe rollback action tracking Add metrics and logging for CoW rewrite operations - Add RewriteMetrics record to CopyOnWriteFileRewriter tracking per-file stats: original/new record counts, deleted rows, file sizes, rewrite duration - Add per-file debug logging in CopyOnWriteFileRewriter - Add aggregate info logging in finishCopyOnWrite summarizing total files rewritten/removed, rows deleted, bytes read/written, and insert count Add configurable CoW concurrency, metrics tests, and design doc updates - Add iceberg.copy-on-write-threads config property with dedicated executor - Add ForIcebergCopyOnWrite binding annotation and executor provider - Wire copyOnWriteExecutor through IcebergMetadataFactory to IcebergMetadata - Add testCowMetricsFileCountAfterMultiFileDelete and testCowMetricsRowCountConsistency - Update COPY_ON_WRITE_DESIGN.md with parallel rewrite config, metrics, and test coverage Remove unused variable to fix error-prone UnusedVariable check Add missing CopyOnWriteFileRewriter binding in LakehouseIcebergModule Address review comments on CoW PR trinodb#28958 - Fix CopyOnWriteFileRewriter: rethrow RuntimeException/Error unchanged in catch block - Fix IcebergMetadata: move commit calls inside try block, collect all futures' rollback actions in catch - Fix TestIcebergCopyOnWrite: replace exact file count assertions with logical invariants - Centralize IcebergMetadata test fixture in BaseTrinoCatalogTest + clean up unused imports - Update 4 catalog test subclasses to use centralized helper Fix high-priority CoW limitations: conflict detection, file format, pre-existing deletes - Switch from RewriteFiles to OverwriteFiles API for full conflict detection (conflictDetectionFilter, validateNoConflictingData/Deletes matching MoR path) - Read source file format from file path extension instead of table default, fixing mixed-format table rewrites (separate sourceFileFormat/writeFileFormat) - Merge pre-existing delete files into deletion vectors before CoW rewrite, preventing row resurrection when switching from MoR to CoW mode - Add mergePreExistingDeletes to DeletionVectorWriter interface + implementation in DefaultDeletionVectorWriter (reuses existing manifest scanning logic) Add comprehensive tests for CoW features: mixed format, pre-existing deletes, conflict detection Tests added (16 new, 42 total): Mixed file format (source format detection): - testCowDeleteOnMixedFormatTable: ORC→PARQUET format change, CoW delete on ORC files - testCowUpdateOnMixedFormatTable: CoW update on ORC data when table default is PARQUET - testCowMergeOnMixedFormatTable: CoW merge on ORC data when table default is PARQUET Pre-existing delete files (MoR→CoW mode switch): - testCowAfterMorDeleteV2: v2 position deletes honored after CoW switch - testCowAfterMorUpdateV2: MoR update deletes preserved through CoW update - testCowAfterMorDeleteV3: v3 deletion vectors honored after CoW switch - testCowAfterMultipleMorDeletes: accumulated MoR deletes all honored - testCowMergeAfterMorDelete: MoR delete + CoW merge on same data file - testCowOnPartitionedTableWithMorDeletesInDifferentPartitions - testCowOnPartitionedTableWithMorDeletesInSamePartition - testCowDeleteAllRowsFromFileWithMorDeletes: remaining rows after MoR - testCowV3WithDvsAndMixedFormat: combined v3 DVs + ORC→PARQUET + CoW Conflict detection (OverwriteFiles API): - testCowWithConcurrentInsert: INSERT + CoW DELETE on different partitions - testCowSnapshotIsolation: snapshot progression and time travel - testCowProducesOverwriteSnapshot: verifies overwrite/append snapshot types - testFullMorToCowMigration: end-to-end MoR→CoW migration workflow Remove design doc from tracked files Fix rollback tracking: clear rollback actions on successful commit After commitTransaction succeeds, clear the rollbackActions queue so that committed files are not accidentally cleaned up. This also prevents duplicate cleanup if the catch block re-iterates already-processed futures when a commit failure occurs after all rewrites completed successfully.
f6bef79 to
60bf7d4
Compare
|
@raunaqmorarka i have addressed all coderabitai review comments. |
|
I read the "execution flow" in description:
I suspect the rewriting is happening on coordinator? |
@chenjian2664 i have updated the pr description and distributed the load to workers . cc @raunaqmorarka |
b74e71d to
e8178f1
Compare
Implement Copy-on-Write mode for DELETE, UPDATE, and MERGE operations in the Iceberg connector. When a table has write.delete.mode, write.update.mode, or write.merge.mode set to 'copy-on-write', the connector rewrites affected data files instead of writing position delete files. Architecture: - Workers emit serialized deletion vectors to the coordinator - Coordinator aggregates deletions across workers per data file - CopyOnWriteFileRewriter reads original files, filters deleted rows, and writes replacement files - Iceberg RewriteFiles API atomically replaces old files with new ones This coordinator-side rewrite approach correctly handles distributed execution where multiple workers may process different rows from the same data file. Add parallel CoW file rewrite, comprehensive tests, and file-path verification - Parallelize coordinator-side file rewrites using icebergScanExecutor thread pool - Add 24 comprehensive Copy-on-Write tests covering DELETE, UPDATE, MERGE, partitioned tables, sequential DML, complex expressions, subquery deletes, time travel, nullable columns, multiple data files, larger datasets, and more - Add testCowReplacesOriginalDataFiles verifying original file paths are fully replaced after CoW operations (no delete files, no stale data files) - Use ConcurrentLinkedQueue for thread-safe rollback action tracking Add metrics and logging for CoW rewrite operations - Add RewriteMetrics record to CopyOnWriteFileRewriter tracking per-file stats: original/new record counts, deleted rows, file sizes, rewrite duration - Add per-file debug logging in CopyOnWriteFileRewriter - Add aggregate info logging in finishCopyOnWrite summarizing total files rewritten/removed, rows deleted, bytes read/written, and insert count Add configurable CoW concurrency, metrics tests, and design doc updates - Add iceberg.copy-on-write-threads config property with dedicated executor - Add ForIcebergCopyOnWrite binding annotation and executor provider - Wire copyOnWriteExecutor through IcebergMetadataFactory to IcebergMetadata - Add testCowMetricsFileCountAfterMultiFileDelete and testCowMetricsRowCountConsistency - Update COPY_ON_WRITE_DESIGN.md with parallel rewrite config, metrics, and test coverage Remove unused variable to fix error-prone UnusedVariable check Add missing CopyOnWriteFileRewriter binding in LakehouseIcebergModule Address review comments on CoW PR trinodb#28958 - Fix CopyOnWriteFileRewriter: rethrow RuntimeException/Error unchanged in catch block - Fix IcebergMetadata: move commit calls inside try block, collect all futures' rollback actions in catch - Fix TestIcebergCopyOnWrite: replace exact file count assertions with logical invariants - Centralize IcebergMetadata test fixture in BaseTrinoCatalogTest + clean up unused imports - Update 4 catalog test subclasses to use centralized helper Fix high-priority CoW limitations: conflict detection, file format, pre-existing deletes - Switch from RewriteFiles to OverwriteFiles API for full conflict detection (conflictDetectionFilter, validateNoConflictingData/Deletes matching MoR path) - Read source file format from file path extension instead of table default, fixing mixed-format table rewrites (separate sourceFileFormat/writeFileFormat) - Merge pre-existing delete files into deletion vectors before CoW rewrite, preventing row resurrection when switching from MoR to CoW mode - Add mergePreExistingDeletes to DeletionVectorWriter interface + implementation in DefaultDeletionVectorWriter (reuses existing manifest scanning logic) Add comprehensive tests for CoW features: mixed format, pre-existing deletes, conflict detection Tests added (16 new, 42 total): Mixed file format (source format detection): - testCowDeleteOnMixedFormatTable: ORC→PARQUET format change, CoW delete on ORC files - testCowUpdateOnMixedFormatTable: CoW update on ORC data when table default is PARQUET - testCowMergeOnMixedFormatTable: CoW merge on ORC data when table default is PARQUET Pre-existing delete files (MoR→CoW mode switch): - testCowAfterMorDeleteV2: v2 position deletes honored after CoW switch - testCowAfterMorUpdateV2: MoR update deletes preserved through CoW update - testCowAfterMorDeleteV3: v3 deletion vectors honored after CoW switch - testCowAfterMultipleMorDeletes: accumulated MoR deletes all honored - testCowMergeAfterMorDelete: MoR delete + CoW merge on same data file - testCowOnPartitionedTableWithMorDeletesInDifferentPartitions - testCowOnPartitionedTableWithMorDeletesInSamePartition - testCowDeleteAllRowsFromFileWithMorDeletes: remaining rows after MoR - testCowV3WithDvsAndMixedFormat: combined v3 DVs + ORC→PARQUET + CoW Conflict detection (OverwriteFiles API): - testCowWithConcurrentInsert: INSERT + CoW DELETE on different partitions - testCowSnapshotIsolation: snapshot progression and time travel - testCowProducesOverwriteSnapshot: verifies overwrite/append snapshot types - testFullMorToCowMigration: end-to-end MoR→CoW migration workflow Remove design doc from tracked files Fix rollback tracking: clear rollback actions on successful commit After commitTransaction succeeds, clear the rollbackActions queue so that committed files are not accidentally cleaned up. This also prevents duplicate cleanup if the catch block re-iterates already-processed futures when a commit failure occurs after all rewrites completed successfully.
de8047a to
4f0fc65
Compare
470baa6 to
392566e
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java (1)
447-502:⚠️ Potential issue | 🟠 MajorInconsistency:
copyOnWriteScanandtablePartitioningexcluded while other coordinator-only fields are included.The
copyOnWriteScanfield (marked as coordinator-only) is excluded fromequals()andhashCode(), which appears consistent withtablePartitioning(also coordinator-only). However, this is inconsistent with other coordinator-only fields:recordScannedFiles,maxScannedFileSize, andforAnalyzeare all included in bothequals()andhashCode()despite being marked as coordinator-only.Verify the rationale for the selective exclusion of only
tablePartitioningandcopyOnWriteScan, or align all coordinator-only fields to follow the same pattern.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java` around lines 447 - 502, The equals() and hashCode() implementations are inconsistent about coordinator-only fields: either exclude all coordinator-only fields or include them all; choose the former (exclude coordinator-only fields). Update equals(Object) and hashCode() to remove recordScannedFiles, maxScannedFileSize, forAnalyze, copyOnWriteScan, and tablePartitioning from comparisons and from Objects.hash(...) so that only non-coordinator state (schemaName, tableName, tableType, snapshotId, tableSchemaJson, specId, partitionSpecJsons, formatVersion, unenforcedPredicate, enforcedPredicate, limit, projectedColumns, nameMappingJson, tableLocation, storageProperties, constraintColumns, etc.) are used; adjust the logic in equals() to no longer reference those coordinator-only fields and ensure hashCode() matches exactly the same set of fields.
🧹 Nitpick comments (1)
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java (1)
72-76: Add null and value validation toRewriteInforecord.The nested
RewriteInforecord lacks the defensive validation present in its parentCommitTaskData. Consider adding null checks and basic value validation for consistency with the codebase patterns:♻️ Proposed validation
public record RewriteInfo( String oldFilePath, long oldFileSizeInBytes, long oldRecordCount, - IcebergFileFormat oldFileFormat) {} + IcebergFileFormat oldFileFormat) +{ + public RewriteInfo + { + requireNonNull(oldFilePath, "oldFilePath is null"); + requireNonNull(oldFileFormat, "oldFileFormat is null"); + } +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java` around lines 72 - 76, The RewriteInfo record lacks the defensive validation used by CommitTaskData; add a compact record constructor in RewriteInfo that checks non-null for oldFilePath and oldFileFormat and validates oldFileSizeInBytes and oldRecordCount are non-negative (throwing NullPointerException for nulls and IllegalArgumentException for invalid numeric values) so the nested record enforces the same invariants as CommitTaskData.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In
`@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java`:
- Around line 447-502: The equals() and hashCode() implementations are
inconsistent about coordinator-only fields: either exclude all coordinator-only
fields or include them all; choose the former (exclude coordinator-only fields).
Update equals(Object) and hashCode() to remove recordScannedFiles,
maxScannedFileSize, forAnalyze, copyOnWriteScan, and tablePartitioning from
comparisons and from Objects.hash(...) so that only non-coordinator state
(schemaName, tableName, tableType, snapshotId, tableSchemaJson, specId,
partitionSpecJsons, formatVersion, unenforcedPredicate, enforcedPredicate,
limit, projectedColumns, nameMappingJson, tableLocation, storageProperties,
constraintColumns, etc.) are used; adjust the logic in equals() to no longer
reference those coordinator-only fields and ensure hashCode() matches exactly
the same set of fields.
---
Nitpick comments:
In
`@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java`:
- Around line 72-76: The RewriteInfo record lacks the defensive validation used
by CommitTaskData; add a compact record constructor in RewriteInfo that checks
non-null for oldFilePath and oldFileFormat and validates oldFileSizeInBytes and
oldRecordCount are non-negative (throwing NullPointerException for nulls and
IllegalArgumentException for invalid numeric values) so the nested record
enforces the same invariants as CommitTaskData.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5c36ec39-1bf3-4ead-936a-28e41a981c23
📒 Files selected for processing (24)
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeTableHandle.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DefaultDeletionVectorWriter.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorWriter.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWriteParquetConnectorTest.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWriteV3ParquetConnectorTest.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.javaplugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.javaplugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java
✅ Files skipped from review due to trivial changes (4)
- plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java
- plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java
- plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java
- plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java
🚧 Files skipped from review as they are similar to previous changes (3)
- plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java
- plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeTableHandle.java
- plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java
👮 Files not reviewed due to content moderation or server errors (4)
- plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DefaultDeletionVectorWriter.java
- plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWriteV3ParquetConnectorTest.java
- plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWriteParquetConnectorTest.java
- plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
88d0c64 to
fb6fab6
Compare
|
Validation:
|
6a289cd to
e650ab4
Compare
Implement Copy-on-Write (CoW) as an alternative to Merge-on-Read (MoR) for Iceberg row-level operations. CoW rewrites entire data files on the worker, producing clean files with no delete files, at the cost of higher write amplification. Enabled via Iceberg table properties: write.delete.mode=copy-on-write write.update.mode=copy-on-write write.merge.mode=copy-on-write Key design decisions: - Workers rewrite files in parallel using a configurable thread pool (iceberg.copy-on-write-rewrite-threads, default: availableProcessors()) - Dangling deletion vectors are identified by workers and serialized in CommitTaskData, avoiding a full table scan on the coordinator during commit (matches Spark's danglingDVs() approach) - Partition-scoped equality deletes are retained during CoW (cleaned up during compaction), matching Spark's behavior - Insert-only CoW MERGE uses OverwriteFiles with conflict detection instead of AppendFiles - File metadata (format, size, record count, data sequence number, first row ID) flows through the merge row ID block as RLE blocks, eliminating a redundant manifest scan (loadSourceDataFileInfos) on workers - CoW splits use 2x weight to reflect read+write cost during scheduling Made-with: Cursor
e650ab4 to
77fc9db
Compare
|
@ebyhr @chenjian2664 could you please review the changes. let me know if i had to test or address any scenario. happy to work on this. |
Description
Implement copy-on-write (CoW) mode for row-level operations in the Iceberg connector.
When a table sets
write.delete.mode,write.update.mode, orwrite.merge.modetocopy-on-write, affected data files are rewritten without deleted rows instead ofproducing positional delete files.
Motivation
Iceberg supports two strategies for row-level modifications:
Read-heavy workloads (dashboards, BI, repeated scans) benefit significantly from CoW
because queries never need to reconcile delete files with data files at read time.
This is particularly valuable for:
OPTIMIZEis undesirable or insufficientSpark, Flink, and PyIceberg already support CoW. This brings Trino to parity.
How it works
Users opt in per-operation via Iceberg table properties (no Trino config change needed):
Or at table creation:
Execution flow:
Planning:
beginMerge()resolvesRowLevelOperationModefrom table propertiesand marks the scan handle with
copyOnWriteScan=true.Split generation:
IcebergSplitSourceskips file splitting for CoW scans —each data file becomes exactly one split, ensuring a single worker sees the
complete file.
Partitioning:
getUpdateLayout()returns a file-path-based partitioning handle(via
IcebergUpdateBucketFunction) so all DELETE/UPDATE rows targeting the samedata file are routed to the same merge-writer task. This is critical for correctness —
without it, multiple workers could each rewrite the same file with incomplete
deletion vectors, causing data duplication.
Worker-side rewriting: Each
IcebergMergeSinkcollects deletion vectors perfile, merges any pre-existing MoR deletes (via
DeletionVectorWriter.mergePreExistingDeletes),then calls
CopyOnWriteFileRewriterto read the original file, filter out deletedrows page-by-page, and write surviving rows to a new data file. If all rows are
deleted, the empty output file is cleaned up and no replacement is written.
Commit fragments: Workers emit
CommitTaskDatawithRewriteInfo(old filepath, size, record count, format) for rewrite results, and plain DATA fragments
for INSERT rows from UPDATE/MERGE.
Coordinator commit:
IcebergMetadata.finishCopyOnWrite()performs no file I/O —it only deserializes commit fragments and commits them:
OverwriteFiles(delete old file + add new file)AppendFilesdetection.
Additional context and related issues
Release notes
(x) Release notes are required, with the following suggested text: